%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_slow_subs_api).

-behaviour(minirest_api).

-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").

-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).

-export([slow_subs/2, get_history/0, settings/2]).

-import(hoconsc, [mk/2, ref/1]).
-import(emqx_mgmt_util, [bad_request/0]).

-define(APP, emqx_slow_subs).
-define(APP_NAME, <<"emqx_slow_subs">>).
-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)).

namespace() -> "slow_subscribers_statistics".

api_spec() ->
    emqx_dashboard_swagger:spec(?MODULE).

paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"].

schema(("/slow_subscriptions")) ->
    #{
      'operationId' => slow_subs,
      delete => #{tags => [<<"slow subs">>],
                  description => <<"Clear current data and re count slow topic">>,
                  parameters => [],
                  'requestBody' => [],
                  responses => #{204 => <<"No Content">>}
                 },
      get => #{tags => [<<"slow subs">>],
               description => <<"Get slow topics statistics record data">>,
               parameters => [ {page, mk(integer(), #{in => query})}
                             , {limit, mk(integer(), #{in => query})}
                             ],
               'requestBody' => [],
               responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
              }
     };

schema("/slow_subscriptions/settings") ->
    #{'operationId' => settings,
      get => #{tags => [<<"slow subs">>],
               description => <<"Get slow subs settings">>,
               responses => #{200 => conf_schema()}
              },
      put => #{tags => [<<"slow subs">>],
               description => <<"Update slow subs settings">>,
               'requestBody' => conf_schema(),
               responses => #{200 => conf_schema()}
              }
     }.

fields(record) ->
    [ {clientid,
       mk(string(), #{desc => <<"the clientid">>})},
      {node,
       mk(string(), #{desc => <<"the node">>})},
      {topic,
       mk(string(), #{desc => <<"the topic">>})},
      {timespan,
       mk(integer(),
          #{desc => <<"timespan for message transmission">>})},
      {last_update_time,
       mk(integer(), #{desc => <<"the timestamp of last update">>})}
    ].

conf_schema() ->
    Ref = hoconsc:ref(emqx_slow_subs_schema, "slow_subs"),
    hoconsc:mk(Ref, #{}).

slow_subs(delete, _) ->
    _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
    {204};

slow_subs(get, _) ->
    NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
    Fun = fun({ok, L}, Acc) -> L ++ Acc;
             (_, Acc) -> Acc
          end,
    RankL = lists:foldl(Fun, [], NodeRankL),

    SortFun = fun(#{timespan := A}, #{timespan := B}) ->
                      A > B
              end,

    SortedL = lists:sort(SortFun, RankL),
    SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),

    {200, SortedL2}.

get_history() ->
    Node = node(),
    RankL = ets:tab2list(?TOPK_TAB),
    ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
                         last_update_time = LastUpdateTime
                        }) ->
                      #{ clientid => ClientId
                       , node => Node
                       , topic => Topic
                       , timespan => TimeSpan
                       , last_update_time => LastUpdateTime
                       }
              end,

    lists:map(ConvFun, RankL).

settings(get, _) ->
    {200, emqx:get_raw_config([slow_subs], #{})};

settings(put, #{body := Body}) ->
    _ = emqx_slow_subs:update_settings(Body),
    {200, emqx:get_raw_config([slow_subs], #{})}.

rpc_call(Fun) ->
    Nodes = mria_mnesia:running_nodes(),
    Fun(Nodes).
